【小ネタ】[Amazon Kinesis Data Firehose] Transform source records with AWS Lambda でレコード毎に改行を追加する
1 はじめに
IoT事業部の平内(SIN)です。
ストリームに流れるレコードを、そのままS3へ保存するような場合、Amazon Kinesis Data Firehoseが非常に便利に利用されると思います。そして、レコードが、JSONのようなテキスト形式だった場合に、レコード毎の区切りが分かるように改行等を入れて保存すると、利用しやすいかも知れません。
このことから、IoT Coreのルールでは、Amazon Kinesis Data Firehoseへ送る際、改行等のセパレータが指定可能になっています。
今回は、Amazon Kinesis Data Streamsを入力ソースとしたAmazon Kinesis Data Firehoseで、S3へ保存する前に、レコード毎に改行を挿入するLambdaを作成してみました。
レコードデータは、手元のMacからboto3で送信しています。
2 Lambda
(1) コード
Lamnbdaのコードは、以下の通りです。
Lambdaには、複数のレコードが到着しますが、レコード毎にデコードして、改行を追加し、再びエンコードします。Lambdaのレスポンスとしては、受信時に取得したRecordId と result を付与したものになります。
今回は、改行の挿入だけですが、ここでは、レコードの変換を自由に行うことができます。また、レコードがJSON形式の場合は、一旦、パースして、JSONとしてその内容を扱うことも可能です。
import json import base64 def lambda_handler(event, context): results = [] records = event["records"] for record in records: recordId = record["recordId"] data = record["data"] # デコード decoded_data = base64.b64decode(data).decode("utf-8") # JSONで処理する場合は、jsonへのパースを行う #payload = json.loads(decoded_data) #decoded_data = json.dumps(payload) # 改行を追加する decoded_data = decoded_data + '\n'; # エンコード data = base64.b64encode(decoded_data.encode()) results.append({ "result":"Ok", "recordId":recordId, "data":data }) return { "records":results }
(2) タイムアウト
Lambdaのタイムアウトは、最低1分としないと、アラートが出てました。
(3) アクセス権
Lambdaには、特にアクセス権を追加する必要はありません。
3 Kinesis Data Firehose
(1) Transform source records with AWS Lambda
Source record transformationをEnableとし、作成したLambdaを設定します。
(2) Amazon S3 destination
送り先は、S3バケットとし、今回は、バッファサイズ、1MiB、インターバルは、60secとしています。
4 動作確認
動作確認のために使用したコードは以下の通りです。手元のMacから20レコードほど送信しました。
import json import random import datetime from boto3.session import Session profile = 'developer' session = Session(profile_name = profile) kinesis = session.client('kinesis') stream_name = 'test' for i in range(20): data = { "index": i, "timestamp": datetime.datetime.now().isoformat(sep=' ', timespec='milliseconds'), "message": "hello" } try: response = kinesis.put_record( StreamName = stream_name, PartitionKey = str(random.randrange(0,100)), Data = json.dumps(data) ) except Exception as e: print("Exception: {}", e.args) print('put_record SequenceNumber:{}'.format(response['SequenceNumber']))
S3バケットに保存されたデータです。
中身を確認すると、各レコードごと改行されていることが確認できます。
5 最後に
今回は、Amazon Kinesis Data Firehoseで、Lambdaによるレコードへの改行追加を行ってみました。
Pythonによる、Transform source records with AWS Lambdaのコードスニペットとして、個人的には重宝しそうです。